// Copyright (c) 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <stddef.h>

#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/test/test_simple_task_runner.h"
#include "content/browser/streams/stream.h"
#include "content/browser/streams/stream_read_observer.h"
#include "content/browser/streams/stream_register_observer.h"
#include "content/browser/streams/stream_registry.h"
#include "content/browser/streams/stream_write_observer.h"
#include "net/base/net_errors.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace content {

class StreamTest : public testing::Test {
public:
    StreamTest()
        : producing_seed_key_(0)
    {
    }

    void SetUp() override { registry_.reset(new StreamRegistry()); }

    // Create a new IO buffer of the given |buffer_size| and fill it with random
    // data.
    scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size)
    {
        scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size));
        char* bufferp = buffer->data();
        for (size_t i = 0; i < buffer_size; i++)
            bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char));
        ++producing_seed_key_;
        return buffer;
    }

protected:
    base::MessageLoop message_loop_;
    std::unique_ptr<StreamRegistry> registry_;

private:
    int producing_seed_key_;
};

class TestStreamReader : public StreamReadObserver {
public:
    TestStreamReader()
        : buffer_(new net::GrowableIOBuffer())
    {
    }
    ~TestStreamReader() override { }

    void Read(Stream* stream)
    {
        const size_t kBufferSize = 32768;
        scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize));

        int bytes_read = 0;
        while (true) {
            Stream::StreamState state = stream->ReadRawData(buffer.get(), kBufferSize, &bytes_read);
            switch (state) {
            case Stream::STREAM_HAS_DATA:
                // TODO(tyoshino): Move these expectations to the beginning of Read()
                // method once Stream::Finalize() is fixed.
                EXPECT_FALSE(completed_);
                break;
            case Stream::STREAM_COMPLETE:
                completed_ = true;
                status_ = stream->GetStatus();
                return;
            case Stream::STREAM_EMPTY:
                EXPECT_FALSE(completed_);
                return;
            case Stream::STREAM_ABORTED:
                aborted_ = true;
                EXPECT_FALSE(completed_);
                return;
            }
            size_t old_capacity = buffer_->capacity();
            buffer_->SetCapacity(old_capacity + bytes_read);
            memcpy(buffer_->StartOfBuffer() + old_capacity,
                buffer->data(), bytes_read);
        }
    }

    void OnDataAvailable(Stream* stream) override { Read(stream); }

    scoped_refptr<net::GrowableIOBuffer> buffer() { return buffer_; }

    bool completed() const { return completed_; }
    bool aborted() const { return aborted_; }
    int status() const { return status_; }

private:
    scoped_refptr<net::GrowableIOBuffer> buffer_;
    bool completed_ = false;
    bool aborted_ = false;
    int status_ = 0;
};

class TestStreamWriter : public StreamWriteObserver {
public:
    TestStreamWriter() { }
    ~TestStreamWriter() override { }

    void Write(Stream* stream,
        scoped_refptr<net::IOBuffer> buffer,
        size_t buffer_size)
    {
        stream->AddData(buffer, buffer_size);
    }

    void OnSpaceAvailable(Stream* stream) override { }

    void OnClose(Stream* stream) override { }
};

class TestStreamObserver : public StreamRegisterObserver {
public:
    TestStreamObserver(const GURL& url, StreamRegistry* registry)
        : url_(url)
        , registry_(registry)
        , registered_(false)
        , stream_(nullptr)
    {
        registry->SetRegisterObserver(url, this);
    }
    ~TestStreamObserver() override { registry_->RemoveRegisterObserver(url_); }
    void OnStreamRegistered(Stream* stream) override
    {
        registered_ = true;
        stream_ = stream;
    }
    bool registered() const { return registered_; }
    Stream* stream() const { return stream_; }

private:
    const GURL url_;
    StreamRegistry* registry_;
    bool registered_;
    Stream* stream_;
};

TEST_F(StreamTest, SetAndRemoveRegisterObserver)
{
    TestStreamWriter writer1;
    TestStreamWriter writer2;
    GURL url1("blob://stream1");
    GURL url2("blob://stream2");
    std::unique_ptr<TestStreamObserver> observer1(
        new TestStreamObserver(url1, registry_.get()));
    std::unique_ptr<TestStreamObserver> observer2(
        new TestStreamObserver(url2, registry_.get()));
    scoped_refptr<Stream> stream1(new Stream(registry_.get(), &writer1, url1));
    EXPECT_TRUE(observer1->registered());
    EXPECT_EQ(observer1->stream(), stream1.get());
    EXPECT_FALSE(observer2->registered());

    observer2.reset();
    scoped_refptr<Stream> stream2(new Stream(registry_.get(), &writer2, url2));
}

TEST_F(StreamTest, SetReadObserver)
{
    TestStreamReader reader;
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(
        new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader));
}

TEST_F(StreamTest, SetReadObserver_SecondFails)
{
    TestStreamReader reader1;
    TestStreamReader reader2;
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(
        new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader1));
    EXPECT_FALSE(stream->SetReadObserver(&reader2));
}

TEST_F(StreamTest, SetReadObserver_TwoReaders)
{
    TestStreamReader reader1;
    TestStreamReader reader2;
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(
        new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader1));

    // Once the first read observer is removed, a new one can be added.
    stream->RemoveReadObserver(&reader1);
    EXPECT_TRUE(stream->SetReadObserver(&reader2));
}

TEST_F(StreamTest, Stream)
{
    TestStreamReader reader;
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(
        new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader));

    const int kBufferSize = 1000000;
    scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    writer.Write(stream.get(), buffer, kBufferSize);
    stream->Finalize(net::OK);
    base::RunLoop().RunUntilIdle();
    EXPECT_TRUE(reader.completed());
    EXPECT_EQ(net::OK, reader.status());

    ASSERT_EQ(reader.buffer()->capacity(), kBufferSize);
    for (int i = 0; i < kBufferSize; i++)
        EXPECT_EQ(buffer->data()[i], reader.buffer()->data()[i]);
}

TEST_F(StreamTest, Abort)
{
    TestStreamReader reader;
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader));

    stream->Abort();
    base::RunLoop().RunUntilIdle();
    EXPECT_FALSE(reader.completed());
    EXPECT_TRUE(reader.aborted());
}

TEST_F(StreamTest, Error)
{
    TestStreamReader reader;
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader));

    stream->Finalize(net::ERR_ACCESS_DENIED);
    base::RunLoop().RunUntilIdle();
    EXPECT_TRUE(reader.completed());
    EXPECT_EQ(net::ERR_ACCESS_DENIED, reader.status());
}

// Test that even if a reader receives an empty buffer, once TransferData()
// method is called on it with |source_complete| = true, following Read() calls
// on it never returns STREAM_EMPTY. Together with StreamTest.Stream above, this
// guarantees that Reader::Read() call returns only STREAM_HAS_DATA
// or STREAM_COMPLETE in |data_available_callback_| call corresponding to
// Writer::Close().
TEST_F(StreamTest, ClosedReaderDoesNotReturnStreamEmpty)
{
    TestStreamReader reader;
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(
        new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader));

    const int kBufferSize = 0;
    scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    stream->AddData(buffer, kBufferSize);
    stream->Finalize(net::OK);
    base::RunLoop().RunUntilIdle();
    EXPECT_TRUE(reader.completed());
    EXPECT_EQ(0, reader.buffer()->capacity());
    EXPECT_EQ(net::OK, reader.status());
}

TEST_F(StreamTest, GetStream)
{
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream1(
        new Stream(registry_.get(), &writer, url));

    scoped_refptr<Stream> stream2 = registry_->GetStream(url);
    ASSERT_EQ(stream1, stream2);
}

TEST_F(StreamTest, GetStream_Missing)
{
    TestStreamWriter writer;

    GURL url1("blob://stream");
    scoped_refptr<Stream> stream1(
        new Stream(registry_.get(), &writer, url1));

    GURL url2("blob://stream2");
    scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
    ASSERT_FALSE(stream2.get());
}

TEST_F(StreamTest, CloneStream)
{
    TestStreamWriter writer;

    GURL url1("blob://stream");
    scoped_refptr<Stream> stream1(
        new Stream(registry_.get(), &writer, url1));

    GURL url2("blob://stream2");
    ASSERT_TRUE(registry_->CloneStream(url2, url1));
    scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
    ASSERT_EQ(stream1, stream2);
}

TEST_F(StreamTest, CloneStream_Missing)
{
    TestStreamWriter writer;

    GURL url1("blob://stream");
    scoped_refptr<Stream> stream1(
        new Stream(registry_.get(), &writer, url1));

    GURL url2("blob://stream2");
    GURL url3("blob://stream3");
    ASSERT_FALSE(registry_->CloneStream(url2, url3));
    scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
    ASSERT_FALSE(stream2.get());
}

TEST_F(StreamTest, UnregisterStream)
{
    TestStreamWriter writer;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream1(
        new Stream(registry_.get(), &writer, url));

    registry_->UnregisterStream(url);
    scoped_refptr<Stream> stream2 = registry_->GetStream(url);
    ASSERT_FALSE(stream2.get());
}

TEST_F(StreamTest, MemoryExceedMemoryUsageLimit)
{
    TestStreamWriter writer1;
    TestStreamWriter writer2;

    GURL url1("blob://stream");
    scoped_refptr<Stream> stream1(
        new Stream(registry_.get(), &writer1, url1));

    GURL url2("blob://stream2");
    scoped_refptr<Stream> stream2(
        new Stream(registry_.get(), &writer2, url2));

    const int kMaxMemoryUsage = 1500000;
    registry_->set_max_memory_usage_for_testing(kMaxMemoryUsage);

    const int kBufferSize = 1000000;
    scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    writer1.Write(stream1.get(), buffer, kBufferSize);
    // Make transfer happen.
    base::RunLoop().RunUntilIdle();

    writer2.Write(stream2.get(), buffer, kBufferSize);

    // Written data (1000000 * 2) exceeded limit (1500000). |stream2| should be
    // unregistered with |registry_|.
    EXPECT_EQ(nullptr, registry_->GetStream(url2).get());

    writer1.Write(stream1.get(), buffer, kMaxMemoryUsage - kBufferSize);
    // Should be accepted since stream2 is unregistered and the new data is not
    // so big to exceed the limit.
    EXPECT_FALSE(registry_->GetStream(url1).get() == nullptr);
}

TEST_F(StreamTest, UnderMemoryUsageLimit)
{
    TestStreamWriter writer;
    TestStreamReader reader;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader));

    registry_->set_max_memory_usage_for_testing(1500000);

    const int kBufferSize = 1000000;
    scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    writer.Write(stream.get(), buffer, kBufferSize);

    // Run loop to make |reader| consume the data.
    base::RunLoop().RunUntilIdle();

    writer.Write(stream.get(), buffer, kBufferSize);

    EXPECT_EQ(stream.get(), registry_->GetStream(url).get());
}

TEST_F(StreamTest, Flush)
{
    TestStreamWriter writer;
    TestStreamReader reader;

    GURL url("blob://stream");
    scoped_refptr<Stream> stream(new Stream(registry_.get(), &writer, url));
    EXPECT_TRUE(stream->SetReadObserver(&reader));

    // If the written data size is smaller than ByteStreamWriter's (total size /
    // kFractionBufferBeforeSending), StreamReadObserver::OnDataAvailable is not
    // called.
    const int kBufferSize = 1;
    scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    writer.Write(stream.get(), buffer, kBufferSize);

    // Run loop to make |reader| consume the data.
    base::RunLoop().RunUntilIdle();
    EXPECT_EQ(0, reader.buffer()->capacity());

    stream->Flush();

    // Run loop to make |reader| consume the data.
    base::RunLoop().RunUntilIdle();
    EXPECT_EQ(kBufferSize, reader.buffer()->capacity());

    EXPECT_EQ(stream.get(), registry_->GetStream(url).get());
}

TEST_F(StreamTest, AbortPendingStream)
{
    TestStreamWriter writer;

    GURL url("blob://stream");
    registry_->AbortPendingStream(url);
    scoped_refptr<Stream> stream1(new Stream(registry_.get(), &writer, url));
    ASSERT_EQ(nullptr, registry_->GetStream(url).get());
}

} // namespace content
